【CDK】AppFlowでSalesforceからS3へデータロード
はじめに
データ事業本部ビッグデータチームのkasamaです。
今回はAppFlowでSalesforceからS3へデータをロードする実装をCDKで行いたいと思います。
前提
今回実現したい構成は以下になります。
- Amazon AppFlowによりSalesforceからオブジェクトを取得し、S3にロードします。
- 失敗した場合はEventBridgeのイベントトリガーでエラー通知します。
事前にSalesforceのDeveloper Editionに登録していることとします。まだの方は以下が参考になります。
今回はSalesforceの取引先(Account)オブジェクトを対象とします。
今回の実装コードについては、Github上に格納してあるのでご確認いただければと思います。
43_appflow_sf_to_s3# tree
.
|-- README.md
|-- bin
| `-- app.ts
|-- cdk.json
|-- cloudformation
| `-- s3.yaml
|-- jest.config.js
|-- lib
| |-- salesforce-account-flow.ts
| `-- salesforce-appflow-stack.ts
|-- package-lock.json
|-- package.json
|-- parameter.ts
|-- sql
| |-- cm_kasama_db.sql
| `-- cm_kasama_salesforce_dev_db.raw_account.sql
|-- test
| `-- 43_appflow_sf_to_s3.test.ts
`-- tsconfig.json
8 directories, 20 files
AppFlowコネクタをAWS Management Consoleから作成
事前にSalesforce上で接続アプリケーション設定をすることで、AppFlowコネクタもCDK or Cfnで実装することは可能ですが、今回はなるべく簡易的にしたいため、AWS Management Consoleから作成します。
手動でコネクタ作成するためには、Amazon AppFlowの接続画面からコネクタにSalesforce
を作成し、接続を作成
をクリックします。
接続名は任意とし、Salesforce 環境はProduction
を選択します。その他はデフォルトのままで接続する
をクリックします。
次にアクセス許可の画面となるので、許可することでコネクタが作成されます。
実装
cloudformation/s3.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'S3 bucket for AppFlow with necessary permissions'
Resources:
S3SalesforceRawDataBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: <your-s3-bucket-name>
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
S3SalesforceRawDataBucketPolicy:
Type: AWS::S3::BucketPolicy
Properties:
Bucket: !Ref S3SalesforceRawDataBucket
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service: appflow.amazonaws.com
Action:
- s3:PutObject
- s3:GetBucketAcl
- s3:PutObjectAcl
- s3:AbortMultipartUpload
- s3:ListMultipartUploadParts
- s3:ListBucketMultipartUploads
Resource:
- !Sub arn:aws:s3:::${S3SalesforceRawDataBucket}
- !Sub arn:aws:s3:::${S3SalesforceRawDataBucket}/*
Condition:
StringEquals:
aws:SourceAccount: !Ref AWS::AccountId
Outputs:
S3SalesforceRawDataBucketName:
Value: !Ref S3SalesforceRawDataBucket
Export:
Name: <your-s3-bucket-name>
データを格納するS3 Bucketをyamlファイルで定義しています。CDKで定義しても良かったですが、簡単に定義できることを考慮し、yamlにしています。
bin/app.ts
#!/usr/bin/env node
import * as cdk from "aws-cdk-lib";
import { AppFlowStack } from "../lib/salesforce-appflow-stack";
import { devParameter, prodParameter } from "../parameter";
const app = new cdk.App();
const envKey = app.node.tryGetContext("environment") ?? "dev"; // default: dev
let parameter;
if (envKey === "dev") {
parameter = devParameter;
} else {
parameter = prodParameter;
}
new AppFlowStack(app, `CMKasamaAppFlow${envKey.toUpperCase()}`, {
description: `${parameter.projectName}-${parameter.envName}-test-tag`,
env: {
account: parameter.env?.account || process.env.CDK_DEFAULT_ACCOUNT,
region: parameter.env?.region || process.env.CDK_DEFAULT_REGION,
},
tags: {
Repository: `${parameter.projectName}-${parameter.envName}-test-tag`,
Environment: parameter.envName,
},
projectName: parameter.projectName,
envName: parameter.envName,
});
app.ts
ではスタックを定義しています。環境変数はparameter.ts
から参照しています。
lib/salesforce-account-flow.ts
import * as appflow from "aws-cdk-lib/aws-appflow";
import * as events from "aws-cdk-lib/aws-events";
import * as targets from "aws-cdk-lib/aws-events-targets";
import type * as s3 from "aws-cdk-lib/aws-s3";
import type * as sns from "aws-cdk-lib/aws-sns";
import { Construct } from "constructs";
// フロー設定
const FLOW_CONFIG = {
objectName: "Account",
flowStatus: "Suspended",
schedule: {
startTime: "2099-01-01T00:00:00+09:00",
expression: "rate(1days)",
offset: 0,
},
s3Prefix: "sf-account-flow",
};
export interface SalesforceFlowProps {
envName: string;
projectName: string;
outDataBucket: s3.IBucket;
salesforceConnectorProfile: string;
errorNotificationTopic: sns.ITopic;
}
export class SalesforceAccountFlow extends Construct {
constructor(scope: Construct, id: string, props: SalesforceFlowProps) {
super(scope, id);
// AppFlowのフロー名
const appFlowName = `${props.projectName}-${props.envName}-${FLOW_CONFIG.s3Prefix}`;
// AppFlowのフロー定義
const flow = new appflow.CfnFlow(this, "SalesforceAccountFlow", {
flowName: appFlowName,
destinationFlowConfigList: [
{
connectorType: "S3",
destinationConnectorProperties: {
s3: {
bucketName: props.outDataBucket.bucketName,
s3OutputFormatConfig: {
fileType: "PARQUET",
aggregationConfig: {
aggregationType: "None",
},
prefixConfig: {
prefixType: "PATH",
prefixFormat: "DAY",
},
preserveSourceDataTyping: true,
},
},
},
},
],
sourceFlowConfig: {
connectorType: "Salesforce",
connectorProfileName: props.salesforceConnectorProfile,
sourceConnectorProperties: {
salesforce: {
object: FLOW_CONFIG.objectName,
// 新しく追加されたフィールドを自動的にインポート
enableDynamicFieldUpdate: false,
includeDeletedRecords: true,
},
},
incrementalPullConfig: {
datetimeTypeFieldName: "LastModifiedDate",
},
},
tasks: [
{
taskType: "Map_all",
sourceFields: [],
taskProperties: [],
},
],
triggerConfig: {
triggerType: "Scheduled",
triggerProperties: {
scheduleStartTime: Math.floor(
new Date(FLOW_CONFIG.schedule.startTime).getTime() / 1000
),
scheduleExpression: FLOW_CONFIG.schedule.expression,
timeZone: "Asia/Tokyo",
scheduleOffset: FLOW_CONFIG.schedule.offset,
dataPullMode: "Incremental",
},
},
flowStatus: FLOW_CONFIG.flowStatus,
});
// AppFlow Failure EventBridgeルールの作成
const appFlowFailureRule = new events.Rule(this, "AppFlowFailureRule", {
eventPattern: {
source: ["aws.appflow"],
detailType: ["AppFlow End Flow Run Report"],
detail: {
"flow-name": [appFlowName],
status: ["Execution Failed"],
},
},
});
// EventBridgeルールのターゲットとしてSNSトピックを設定
appFlowFailureRule.addTarget(
new targets.SnsTopic(props.errorNotificationTopic, {
message: events.RuleTargetInput.fromText(
`AppFlow execution failed for flow: ${appFlowName}. Please check the AWS AppFlow console for more details.`
),
})
);
}
}
salesforce-account-flow.ts
では、Salesforceのオブジェクト連携が増えて、AppFlowを追加する必要があることを想定し、FLOW_CONFIG
に設定項目をまとめています。AppFlowの設定としては、CDKでのデプロイ時にstartTime
で過去日を設定しているとエラーとなるため、未来日を設定します。includeDeletedRecords
の値をtrue
とすることで削除したレコードが存在する場合は、isdeleted
カラムがtrue
のレコードが連携されます。datetimeTypeFieldName
で増分転送用のカラムをLastModifiedDate
に設定しています。taskType
でMap_all
を選択することで抽出カラムが全カラムとなります。
lib/salesforce-appflow-stack.ts
import * as cdk from "aws-cdk-lib";
import type * as athena from "aws-cdk-lib/aws-athena";
import * as s3 from "aws-cdk-lib/aws-s3";
import * as sns from "aws-cdk-lib/aws-sns";
import type { Construct } from "constructs";
import { SalesforceAccountFlow } from "./salesforce-account-flow";
export interface AppFlowStackProps extends cdk.StackProps {
envName: string;
projectName: string;
}
export class AppFlowStack extends cdk.Stack {
public readonly outDataBucket: s3.IBucket;
public readonly errorNotificationTopic: sns.Topic;
public readonly salesforceConnectorProfile: string;
public readonly athenaWorkgroup: athena.CfnWorkGroup;
constructor(scope: Construct, id: string, props: AppFlowStackProps) {
super(scope, id, props);
// 共通リソースの作成
this.outDataBucket = this.createS3Bucket(props);
this.errorNotificationTopic = this.createErrorNotificationTopic(props);
this.salesforceConnectorProfile = `${props.projectName}-${props.envName}-salesforce-flow-connector`;
// Salesforceオブジェクトごとのフロー作成
new SalesforceAccountFlow(this, "AccountFlow", {
envName: props.envName,
projectName: props.projectName,
outDataBucket: this.outDataBucket,
salesforceConnectorProfile: this.salesforceConnectorProfile,
errorNotificationTopic: this.errorNotificationTopic,
});
}
private createS3Bucket(props: AppFlowStackProps): s3.IBucket {
return s3.Bucket.fromBucketName(this, "OutDataBucket", `<your-s3-bucket>`);
}
private createErrorNotificationTopic(props: AppFlowStackProps): sns.Topic {
return new sns.Topic(this, "ErrorNotificationTopic", {
topicName: `${props.projectName}-${props.envName}-error-notification-topic`,
});
}
}
salesforce-appflow-stack.ts
では共通で使用するリソースとSalesforceオブジェクトごとのAppFlowを定義しています。
parameter.ts
import { Environment } from "aws-cdk-lib";
// Parameters for Application
export interface AppParameter {
env: Environment;
envName: string;
projectName: string;
}
// Example
export const devParameter: AppParameter = {
envName: "dev",
projectName: "cm-kasama",
env: {},
// env: { account: "xxxxxx", region: "ap-northeast-1" },
};
export const prodParameter: AppParameter = {
envName: "prod",
projectName: "cm-kasama",
env: {},
// env: { account: "xxxxxx", region: "ap-northeast-1" },
};
parameter.ts
では環境ごとに活用するparameterを定義しています。
デプロイ
S3 デプロイ
S3はyamlファイルで定義しているのでCloudFormationでデプロイします。AWS Management Console上のCloudFormationを選択し、スタックの作成
からテンプレートファイルをアップロード
でyamlファイルをアップロードし、スタックを作成します。
Databse, table作成
Athena上でDDLを実行し、tableを作成します。
DDL
CREATE DATABASE IF NOT EXISTS cm_kasama_salesforce_dev_db;
CREATE EXTERNAL TABLE cm_kasama_salesforce_dev_db.raw_account(
Id STRING,
IsDeleted BOOLEAN,
MasterRecordId STRING,
Name STRING,
Type STRING,
ParentId STRING,
BillingStreet STRING,
BillingCity STRING,
BillingState STRING,
BillingPostalCode STRING,
BillingCountry STRING,
BillingLatitude DOUBLE,
BillingLongitude DOUBLE,
BillingGeocodeAccuracy STRING,
BillingAddress STRING,
ShippingStreet STRING,
ShippingCity STRING,
ShippingState STRING,
ShippingPostalCode STRING,
ShippingCountry STRING,
ShippingLatitude DOUBLE,
ShippingLongitude DOUBLE,
ShippingGeocodeAccuracy STRING,
ShippingAddress STRING,
Phone STRING,
Fax STRING,
AccountNumber STRING,
Website STRING,
PhotoUrl STRING,
Sic STRING,
Industry STRING,
AnnualRevenue STRING,
NumberOfEmployees INT,
Ownership STRING,
TickerSymbol STRING,
Description STRING,
Rating STRING,
Site STRING,
OwnerId STRING,
CreatedDate TIMESTAMP,
CreatedById STRING,
LastModifiedDate TIMESTAMP,
LastModifiedById STRING,
SystemModstamp TIMESTAMP,
LastActivityDate TIMESTAMP,
LastViewedDate TIMESTAMP,
LastReferencedDate TIMESTAMP,
Jigsaw STRING,
JigsawCompanyId STRING,
CleanStatus STRING,
AccountSource STRING,
DunsNumber STRING,
Tradestyle STRING,
NaicsCode STRING,
NaicsDesc STRING,
YearStarted STRING,
SicDesc STRING,
DandbCompanyId STRING,
OperatingHoursId STRING,
CustomerPriority__c STRING,
SLA__c STRING,
Active__c STRING,
NumberofLocations__c DOUBLE,
UpsellOpportunity__c STRING,
SLASerialNumber__c STRING,
SLAExpirationDate__c TIMESTAMP
)
PARTITIONED BY (
year string,
month string,
day string
)
STORED AS PARQUET
LOCATION 's3://<your-s3-bucket>/cm-kasama-dev-sf-account-flow/'
TBLPROPERTIES (
'projection.enabled' = 'true',
'projection.year.type' = 'date',
'projection.year.format' = 'yyyy',
'projection.year.range' = '2024,NOW',
'projection.month.type' = 'integer',
'projection.month.range' = '1,12',
'projection.month.digits' = '2',
'projection.day.type' = 'integer',
'projection.day.range' = '1,31',
'projection.day.digits' = '2',
'storage.location.template' = 's3://<your-s3-bucket>/cm-kasama-dev-sf-account-flow/${year}/${month}/${day}'
);
CDK デプロイ
package.jsonがあるディレクトリで依存関係をインストールします。
npm install
同じくcdk.jsonがあるディレクトリでデプロイコマンドを実行します。--all
はCDKアプリケーションに含まれる全てのスタックをデプロイするためのオプション、--require-approval never
はセキュリティ的に敏感な変更やIAMリソースの変更を含むデプロイメント時の承認を求めるダイアログ表示を完全にスキップします。never
は、どんな変更でも事前確認なしにデプロイすることを意味します。今回は検証用なので指定していますが、慎重にデプロイする場合は必要のないオプションになるかもしれません。-c
でenvironment
を指定し、環境に合わせたデプロイを行います。
npx cdk deploy --all --require-approval never -c environment=dev --profile <YOUR_AWS_PROFILE>
実行
正常系
デプロイされたAppFlowをオンデマンドモードに修正し手動で実行しました。成功で終了しました。
Athena上でもtableに対してSelectしたところ15件出力されていました。
異常系
次に異常系を確認します。
最初に作成されたSNS TOPICに対してメールアドレスでサブスクリプション登録します。
AppFlowで異常を発生させるためにS3 Bucket policyを手動で削除します。
この状態でAppFlow実行し、想定通りエラーとなりました。
EventBridgeトリガーでSNS TOPICから通知も来ています。
Amazon AppFlowの注意事項
実装や試験を進める中で何点かつまづいたポイントがあったので、記載しておきます。
増分転送時の初回連携分は30日間のデータとなる
AWSの公式サイトにも記載がありますが、増分転送モードの場合、最初のスケジュールによってトリガーされたフローは、過去 30 日間のレコードを取得します。
When you select incremental transfer, Amazon AppFlow transfers only the records that have been added or changed since the last successful flow run. You can also select a source timestamp field to specify how Amazon AppFlow identifies new or changed records. For example, if you have a Created Date timestamp field, choose this to instruct Amazon AppFlow to transfer only newly-created records (and not changed records) since the last successful flow run. The first schedule-triggered flow will pull 30 days of past records at the time of the first flow run.
試しに初回を増分転送にして、スケジュール実行します。
最終更新日が30日以内のデータは1件です。
AppFlowは成功しましたが、処理されたレコード件数は1件です。
Athenaでクエリも実行しましたが、データとしては1件のみのため、やはり30日間のデータしか取得できないようでした。そのため、初回連携時に全件を取得したい場合は、スケジュール実行の完全転送
モードで実行することが良いと思います。
AppFlowで作成されるpartitionはUTC
試しに分でのpartition設定としてオンデマンド実行します。
AppFlowはJSTの2025年1月6日 08:01に実行しました。
S3 URIを確認すると2025年1月5日 23:01となっていますのでUTCとなります。
削除レコードの扱い
AppFlowのその他の設定で削除されたレコードのインポート
にチェックが入っていると削除レコードも増分として転送されます。
試しにSalesforce上でレコードを1件削除します。
Salesforceではレコードを削除すると15日以内はごみ箱に物理削除待ちレコードとして登録されます。15日を経過するとそのデータは自動的に完全に削除(物理削除)されます。
AppFlowを実行し、出力されたデータをAthenaで参照しました。isdeleted
カラムがtrue
で連携されています。このカラムを用いて削除されているか否かを判断できます。
AppFlowのSalesforce連携ではスケジュール実行は1分あたり1回
AWS公式サイトにも記載がありますが、AppFlowのSalesforce連携では1分間に最大1回のフロー実行となります。二つ同時に実行してみたところ、片方はスケジュール時間通りに動作し、片方は2,3分遅れて動作しました。取り扱うデータ量によって時間差があると思いますのであくまで参考程度にしていただければと思います。実際にプロジェクトで導入する際は、どのくらい時間差があるか、差分データが過不足なく取得できているかの検証が必要だと思います。
Salesforce: Maximum frequency of one flow run per minute
最後に
AppFlowで簡単にデータロードはできるので、複雑な加工要件が無ければ一つの選択肢として良いと思いました!